/*
 * Copyright 2009-2013 by The Regents of the University of California
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * you may obtain a copy of the License from
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package edu.uci.ics.hivesterix.serde.lazy;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;

import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.RecordInfo;
import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
import edu.uci.ics.hivesterix.serde.lazy.objectinspector.LazyMapObjectInspector;

/**
 * LazyMap is serialized as follows: start A b c b c b c end bytes[] ->
 * |--------|---|---|---|---| ... |---|---|
 * Section A is the null-bytes. Suppose the map has N key-value pairs, then
 * there are (N*2+7)/8 bytes used as null-bytes. Each bit corresponds to a key
 * or a value and it indicates whether that key or value is null (0) or not null
 * (1).
 * After A, all the bytes are actual serialized data of the map, which are
 * key-value pairs. b represent the keys and c represent the values. Each of
 * them is again a LazyObject.
 */

@SuppressWarnings("rawtypes")
public class LazyMap extends LazyNonPrimitive<LazyMapObjectInspector> {

    private static Log LOG = LogFactory.getLog(LazyMap.class.getName());

    /**
     * Whether the data is already parsed or not.
     */
    boolean parsed;

    /**
     * The size of the map. Only valid when the data is parsed. -1 when the map
     * is NULL.
     */
    int mapSize = 0;

    /**
     * The beginning position and length of key[i] and value[i]. Only valid when
     * the data is parsed.
     */
    int[] keyStart;
    int[] keyLength;
    int[] valueStart;
    int[] valueLength;
    /**
     * Whether valueObjects[i]/keyObjects[i] is initialized or not.
     */
    boolean[] keyInited;
    boolean[] valueInited;

    /**
     * Whether valueObjects[i]/keyObjects[i] is null or not This could not be
     * inferred from the length of the object. In particular, a 0-length string
     * is not null.
     */
    boolean[] keyIsNull;
    boolean[] valueIsNull;

    /**
     * The keys are stored in an array of LazyPrimitives.
     */
    LazyPrimitive<?, ?>[] keyObjects;
    /**
     * The values are stored in an array of LazyObjects. value[index] will start
     * from KeyEnd[index] + 1, and ends before KeyStart[index+1] - 1.
     */
    LazyObject[] valueObjects;

    protected LazyMap(LazyMapObjectInspector oi) {
        super(oi);
    }

    /**
     * Set the row data for this LazyMap.
     * 
     * @see LazyObject#init(ByteArrayRef, int, int)
     */
    @Override
    public void init(byte[] bytes, int start, int length) {
        super.init(bytes, start, length);
        parsed = false;
    }

    /**
     * Adjust the size of arrays: keyStart, keyLength valueStart, valueLength
     * keyInited, keyIsNull valueInited, valueIsNull.
     */
    protected void adjustArraySize(int newSize) {
        if (keyStart == null || keyStart.length < newSize) {
            keyStart = new int[newSize];
            keyLength = new int[newSize];
            valueStart = new int[newSize];
            valueLength = new int[newSize];
            keyInited = new boolean[newSize];
            keyIsNull = new boolean[newSize];
            valueInited = new boolean[newSize];
            valueIsNull = new boolean[newSize];
            keyObjects = new LazyPrimitive<?, ?>[newSize];
            valueObjects = new LazyObject[newSize];
        }
    }

    boolean nullMapKey = false;
    VInt vInt = new LazyUtils.VInt();
    RecordInfo recordInfo = new LazyUtils.RecordInfo();

    /**
     * Parse the byte[] and fill keyStart, keyLength, keyIsNull valueStart,
     * valueLength and valueIsNull.
     */
    private void parse() {

        // get the VInt that represents the map size
        LazyUtils.readVInt(bytes, start, vInt);
        mapSize = vInt.value;
        if (0 == mapSize) {
            parsed = true;
            return;
        }

        // adjust arrays
        adjustArraySize(mapSize);

        // find out the null-bytes
        int mapByteStart = start + vInt.length;
        int nullByteCur = mapByteStart;
        int nullByteEnd = mapByteStart + (mapSize * 2 + 7) / 8;
        int lastElementByteEnd = nullByteEnd;

        // parsing the keys and values one by one
        for (int i = 0; i < mapSize; i++) {
            // parse a key
            keyIsNull[i] = true;
            if ((bytes[nullByteCur] & (1 << ((i * 2) % 8))) != 0) {
                keyIsNull[i] = false;
                LazyUtils.checkObjectByteInfo(((MapObjectInspector) oi).getMapKeyObjectInspector(), bytes,
                        lastElementByteEnd, recordInfo);
                keyStart[i] = lastElementByteEnd + recordInfo.elementOffset;
                keyLength[i] = recordInfo.elementSize;
                lastElementByteEnd = keyStart[i] + keyLength[i];
            } else if (!nullMapKey) {
                nullMapKey = true;
                LOG.warn("Null map key encountered! Ignoring similar problems.");
            }

            // parse a value
            valueIsNull[i] = true;
            if ((bytes[nullByteCur] & (1 << ((i * 2 + 1) % 8))) != 0) {
                valueIsNull[i] = false;
                LazyUtils.checkObjectByteInfo(((MapObjectInspector) oi).getMapValueObjectInspector(), bytes,
                        lastElementByteEnd, recordInfo);
                valueStart[i] = lastElementByteEnd + recordInfo.elementOffset;
                valueLength[i] = recordInfo.elementSize;
                lastElementByteEnd = valueStart[i] + valueLength[i];
            }

            // move onto the next null byte
            if (3 == (i % 4)) {
                nullByteCur++;
            }
        }

        Arrays.fill(keyInited, 0, mapSize, false);
        Arrays.fill(valueInited, 0, mapSize, false);
        parsed = true;
    }

    /**
     * Get the value object with the index without checking parsed.
     * 
     * @param index
     *            The index into the array starting from 0
     */
    private LazyObject uncheckedGetValue(int index) {
        if (valueIsNull[index]) {
            return null;
        }
        if (!valueInited[index]) {
            valueInited[index] = true;
            if (valueObjects[index] == null) {
                valueObjects[index] = LazyFactory.createLazyObject(((MapObjectInspector) oi)
                        .getMapValueObjectInspector());
            }
            valueObjects[index].init(bytes, valueStart[index], valueLength[index]);
        }
        return valueObjects[index];
    }

    /**
     * Get the value in the map for the key.
     * If there are multiple matches (which is possible in the serialized
     * format), only the first one is returned.
     * The most efficient way to get the value for the key is to serialize the
     * key and then try to find it in the array. We do linear search because in
     * most cases, user only wants to get one or two values out of the map, and
     * the cost of building up a HashMap is substantially higher.
     * 
     * @param key
     *            The key object that we are looking for.
     * @return The corresponding value object, or NULL if not found
     */
    public Object getMapValueElement(Object key) {
        if (!parsed) {
            parse();
        }
        // search for the key
        for (int i = 0; i < mapSize; i++) {
            LazyPrimitive<?, ?> lazyKeyI = uncheckedGetKey(i);
            if (lazyKeyI == null) {
                continue;
            }
            // getWritableObject() will convert LazyPrimitive to actual
            // primitive
            // writable objects.
            Object keyI = lazyKeyI.getWritableObject();
            if (keyI == null) {
                continue;
            }
            if (keyI.equals(key)) {
                // Got a match, return the value
                LazyObject v = uncheckedGetValue(i);
                return v == null ? v : v.getObject();
            }
        }
        return null;
    }

    /**
     * Get the key object with the index without checking parsed.
     * 
     * @param index
     *            The index into the array starting from 0
     */
    private LazyPrimitive<?, ?> uncheckedGetKey(int index) {
        if (keyIsNull[index]) {
            return null;
        }
        if (!keyInited[index]) {
            keyInited[index] = true;
            if (keyObjects[index] == null) {
                // Keys are always primitive
                keyObjects[index] = LazyFactory
                        .createLazyPrimitiveClass((PrimitiveObjectInspector) ((MapObjectInspector) oi)
                                .getMapKeyObjectInspector());
            }
            keyObjects[index].init(bytes, keyStart[index], keyLength[index]);
        }
        return keyObjects[index];
    }

    /**
     * cachedMap is reused for different calls to getMap(). But each LazyMap has
     * a separate cachedMap so we won't overwrite the data by accident.
     */
    LinkedHashMap<Object, Object> cachedMap;

    /**
     * Return the map object representing this LazyMap. Note that the keyObjects
     * will be Writable primitive objects.
     * 
     * @return the map object
     */
    public Map<Object, Object> getMap() {
        if (!parsed) {
            parse();
        }
        if (cachedMap == null) {
            // Use LinkedHashMap to provide deterministic order
            cachedMap = new LinkedHashMap<Object, Object>();
        } else {
            cachedMap.clear();
        }

        // go through each element of the map
        for (int i = 0; i < mapSize; i++) {
            LazyPrimitive<?, ?> lazyKey = uncheckedGetKey(i);
            if (lazyKey == null) {
                continue;
            }
            Object key = lazyKey.getObject();
            // do not overwrite if there are duplicate keys
            if (key != null && !cachedMap.containsKey(key)) {
                LazyObject lazyValue = uncheckedGetValue(i);
                Object value = (lazyValue == null ? null : lazyValue.getObject());
                cachedMap.put(key, value);
            }
        }
        return cachedMap;
    }

    /**
     * Get the size of the map represented by this LazyMap.
     * 
     * @return The size of the map
     */
    public int getMapSize() {
        if (!parsed) {
            parse();
        }
        return mapSize;
    }
}
